/*
 * This file is open source software, licensed to you under the terms
 * of the Apache License, Version 2.0 (the "License").  See the NOTICE file
 * distributed with this work for additional information regarding copyright
 * ownership.  You may not use this file except in compliance with the License.
 *
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
/*
 * Copyright (C) 2014 Cloudius Systems, Ltd.
 * Copyright (c) 2022 StoneAtom, Inc. All rights reserved.
 */

#ifndef TIANMU_BASE_SEMAPHORE_H_
#define TIANMU_BASE_SEMAPHORE_H_
#pragma once

#include <exception>
#include <stdexcept>

#include "base/core/chunked_fifo.h"
#include "base/core/expiring_fifo.h"
#include "base/core/future.h"
#include "base/core/timer.h"

namespace Tianmu {
namespace base {

/// \addtogroup fiber-module
/// @{

/// Exception thrown when a semaphore is broken by
/// \ref semaphore::broken().
class broken_semaphore : public std::exception {
 public:
  /// Reports the exception reason.
  const char *what() const noexcept override { return "Semaphore broken"; }
};

/// Exception thrown when a semaphore wait operation
/// times out.
///
/// \see semaphore::wait(typename timer<>::duration timeout, size_t nr)
class semaphore_timed_out : public std::exception {
 public:
  /// Reports the exception reason.
  const char *what() const noexcept override { return "Semaphore timedout"; }
};

/// Exception Factory for standard semaphore
///
/// constructs standard semaphore exceptions
/// \see semaphore_timed_out and broken_semaphore
struct semaphore_default_exception_factory {
  static semaphore_timed_out timeout() { return semaphore_timed_out(); }
  static broken_semaphore broken() { return broken_semaphore(); }
};

/// \brief Counted resource guard.
///
/// This is a standard computer science semaphore, adapted
/// for futures.  You can deposit units into a counter,
/// or take them away.  Taking units from the counter may wait
/// if not enough units are available.
///
/// To support exceptional conditions, a \ref broken() method
/// is provided, which causes all current waiters to stop waiting,
/// with an exceptional future returned.  This allows causing all
/// fibers that are blocked on a semaphore to continue.  This is
/// similar to POSIX's `pthread_cancel()`, with \ref wait() acting
/// as a cancellation point.
///
/// \tparam ExceptionFactory template parameter allows modifying a semaphore to
/// throw customized exceptions on timeout/broken(). It has to provide two
/// static functions ExceptionFactory::timeout() and ExceptionFactory::broken()
/// which return corresponding exception object.
template <typename ExceptionFactory, typename Clock = typename timer<>::clock>
class basic_semaphore {
 public:
  using duration = typename timer<Clock>::duration;
  using clock = typename timer<Clock>::clock;
  using time_point = typename timer<Clock>::time_point;

 private:
  ssize_t _count;
  std::exception_ptr _ex;
  struct entry {
    promise<> pr;
    size_t nr;
    entry(promise<> &&pr_, size_t nr_) : pr(std::move(pr_)), nr(nr_) {}
  };
  struct expiry_handler {
    void operator()(entry &e) noexcept { e.pr.set_exception(ExceptionFactory::timeout()); }
  };
  expiring_fifo<entry, expiry_handler, clock> _wait_list;
  bool has_available_units(size_t nr) const { return _count >= 0 && (static_cast<size_t>(_count) >= nr); }
  bool may_proceed(size_t nr) const { return has_available_units(nr) && _wait_list.empty(); }

 public:
  /// Returns the maximum number of units the semaphore counter can hold
  static constexpr size_t max_counter() { return std::numeric_limits<decltype(_count)>::max(); }

  /// Constructs a semaphore object with a specific number of units
  /// in its internal counter. E.g., starting it at 1 is suitable for use as
  /// an unlocked mutex.
  ///
  /// \param count number of initial units present in the counter.
  basic_semaphore(size_t count) : _count(count) {}
  /// Waits until at least a specific number of units are available in the
  /// counter, and reduces the counter by that amount of units.
  ///
  /// \note Waits are serviced in FIFO order, though if several are awakened
  ///       at once, they may be reordered by the scheduler.
  ///
  /// \param nr Amount of units to wait for (default 1).
  /// \return a future that becomes ready when sufficient units are available
  ///         to satisfy the request.  If the semaphore was \ref broken(), may
  ///         contain an exception.
  future<> wait(size_t nr = 1) { return wait(time_point::max(), nr); }
  /// Waits until at least a specific number of units are available in the
  /// counter, and reduces the counter by that amount of units.  If the request
  /// cannot be satisfied in time, the request is aborted.
  ///
  /// \note Waits are serviced in FIFO order, though if several are awakened
  ///       at once, they may be reordered by the scheduler.
  ///
  /// \param timeout expiration time.
  /// \param nr Amount of units to wait for (default 1).
  /// \return a future that becomes ready when sufficient units are available
  ///         to satisfy the request.  On timeout, the future contains a
  ///         \ref semaphore_timed_out exception.  If the semaphore was
  ///         \ref broken(), may contain an exception.
  future<> wait(time_point timeout, size_t nr = 1) {
    if (may_proceed(nr)) {
      _count -= nr;
      return make_ready_future<>();
    }
    if (_ex) {
      return make_exception_future(_ex);
    }
    promise<> pr;
    auto fut = pr.get_future();
    _wait_list.push_back(entry(std::move(pr), nr), timeout);
    return fut;
  }

  /// Waits until at least a specific number of units are available in the
  /// counter, and reduces the counter by that amount of units.  If the request
  /// cannot be satisfied in time, the request is aborted.
  ///
  /// \note Waits are serviced in FIFO order, though if several are awakened
  ///       at once, they may be reordered by the scheduler.
  ///
  /// \param timeout how long to wait.
  /// \param nr Amount of units to wait for (default 1).
  /// \return a future that becomes ready when sufficient units are available
  ///         to satisfy the request.  On timeout, the future contains a
  ///         \ref semaphore_timed_out exception.  If the semaphore was
  ///         \ref broken(), may contain an exception.
  future<> wait(duration timeout, size_t nr = 1) { return wait(clock::now() + timeout, nr); }
  /// Deposits a specified number of units into the counter.
  ///
  /// The counter is incremented by the specified number of units.
  /// If the new counter value is sufficient to satisfy the request
  /// of one or more waiters, their futures (in FIFO order) become
  /// ready, and the value of the counter is reduced according to
  /// the amount requested.
  ///
  /// \param nr Number of units to deposit (default 1).
  void signal(size_t nr = 1) {
    if (_ex) {
      return;
    }
    _count += nr;
    while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
      auto &x = _wait_list.front();
      _count -= x.nr;
      x.pr.set_value();
      _wait_list.pop_front();
    }
  }

  /// Consume the specific number of units without blocking
  //
  /// Consume the specific number of units now, regardless of how many units are
  /// available in the counter, and reduces the counter by that amount of units.
  /// This operation may cause the counter to go negative.
  ///
  /// \param nr Amount of units to consume (default 1).
  void consume(size_t nr = 1) {
    if (_ex) {
      return;
    }
    _count -= nr;
  }

  /// Attempts to reduce the counter value by a specified number of units.
  ///
  /// If sufficient units are available in the counter, and if no
  /// other fiber is waiting, then the counter is reduced.  Otherwise,
  /// nothing happens.  This is useful for "opportunistic" waits where
  /// useful work can happen if the counter happens to be ready, but
  /// when it is not worthwhile to wait.
  ///
  /// \param nr number of units to reduce the counter by (default 1).
  /// \return `true` if the counter had sufficient units, and was decremented.
  bool try_wait(size_t nr = 1) {
    if (may_proceed(nr)) {
      _count -= nr;
      return true;
    } else {
      return false;
    }
  }
  /// Returns the number of units available in the counter.
  ///
  /// Does not take into account any waiters.
  size_t current() const { return std::max(_count, ssize_t(0)); }

  /// Returns the number of available units.
  ///
  /// Takes into account units consumed using \ref consume() and therefore
  /// may return a negative value.
  ssize_t available_units() const { return _count; }

  /// Returns the current number of waiters
  size_t waiters() const { return _wait_list.size(); }

  /// Signal to waiters that an error occurred.  \ref wait() will see
  /// an exceptional future<> containing a \ref broken_semaphore exception.
  /// The future is made available immediately.
  void broken() { broken(std::make_exception_ptr(ExceptionFactory::broken())); }

  /// Signal to waiters that an error occurred.  \ref wait() will see
  /// an exceptional future<> containing the provided exception parameter.
  /// The future is made available immediately.
  template <typename Exception>
  void broken(const Exception &ex) {
    broken(std::make_exception_ptr(ex));
  }

  /// Signal to waiters that an error occurred.  \ref wait() will see
  /// an exceptional future<> containing the provided exception parameter.
  /// The future is made available immediately.
  void broken(std::exception_ptr ex);

  /// Reserve memory for waiters so that wait() will not throw.
  void ensure_space_for_waiters(size_t n) { _wait_list.reserve(n); }
};

template <typename ExceptionFactory, typename Clock>
inline void basic_semaphore<ExceptionFactory, Clock>::broken(std::exception_ptr xp) {
  _ex = xp;
  _count = 0;
  while (!_wait_list.empty()) {
    auto &x = _wait_list.front();
    x.pr.set_exception(xp);
    _wait_list.pop_front();
  }
}

template <typename ExceptionFactory = semaphore_default_exception_factory, typename Clock = typename timer<>::clock>
class semaphore_units {
  basic_semaphore<ExceptionFactory, Clock> &_sem;
  size_t _n;

 public:
  semaphore_units(basic_semaphore<ExceptionFactory, Clock> &sem, size_t n) noexcept : _sem(sem), _n(n) {}
  semaphore_units(semaphore_units &&o) noexcept : _sem(o._sem), _n(o._n) { o._n = 0; }
  semaphore_units &operator=(semaphore_units &&o) noexcept {
    if (this != &o) {
      this->~semaphore_units();
      new (this) semaphore_units(std::move(o));
    }
    return *this;
  }
  semaphore_units(const semaphore_units &) = delete;
  ~semaphore_units() noexcept {
    if (_n) {
      _sem.signal(_n);
    }
  }
  /// Releases ownership of the units. The semaphore will not be signalled.
  ///
  /// \return the number of units held
  size_t release() { return std::exchange(_n, 0); }
};

/// \brief Take units from semaphore temporarily
///
/// Takes units from the semaphore and returns them when the \ref
/// semaphore_units object goes out of scope. This provides a safe way to
/// temporarily take units from a semaphore and ensure that they are eventually
/// returned under all circumstances (exceptions, premature scope exits, etc).
///
/// Unlike with_semaphore(), the scope of unit holding is not limited to the
/// scope of a single async lambda.
///
/// \param sem The semaphore to take units from
/// \param units  Number of units to take
/// \return a \ref future<> holding \ref semaphore_units object. When the object
/// goes out of scope
///         the units are returned to the semaphore.
///
/// \note The caller must guarantee that \c sem is valid as long as
///      \ref seaphore_units object is alive.
///
/// \related semaphore
template <typename ExceptionFactory, typename Clock = typename timer<>::clock>
future<semaphore_units<ExceptionFactory, Clock>> get_units(basic_semaphore<ExceptionFactory, Clock> &sem,
                                                           size_t units) {
  return sem.wait(units).then([&sem, units] { return semaphore_units<ExceptionFactory, Clock>{sem, units}; });
}

/// \brief Take units from semaphore temporarily with time bound on wait
///
/// Like \ref get_units(basic_semaphore<ExceptionFactory>&, size_t) but when
/// timeout is reached before units are granted throws semaphore_timed_out
/// exception.
///
/// \param sem The semaphore to take units from
/// \param units  Number of units to take
/// \return a \ref future<> holding \ref semaphore_units object. When the object
/// goes out of scope
///         the units are returned to the semaphore.
///
/// \note The caller must guarantee that \c sem is valid as long as
///      \ref seaphore_units object is alive.
///
/// \related semaphore
template <typename ExceptionFactory, typename Clock = typename timer<>::clock>
future<semaphore_units<ExceptionFactory, Clock>> get_units(
    basic_semaphore<ExceptionFactory, Clock> &sem, size_t units,
    typename basic_semaphore<ExceptionFactory, Clock>::time_point timeout) {
  return sem.wait(timeout, units).then([&sem, units] { return semaphore_units<ExceptionFactory, Clock>{sem, units}; });
}

/// \brief Take units from semaphore temporarily with time bound on wait
///
/// Like \ref get_units(basic_semaphore<ExceptionFactory>&, size_t,
/// basic_semaphore<ExceptionFactory>::time_point) but allow the timeout to be
/// specified as a duration.
///
/// \param sem The semaphore to take units from
/// \param units  Number of units to take
/// \param timeout a duration specifying when to timeout the current request
/// \return a \ref future<> holding \ref semaphore_units object. When the object
/// goes out of scope
///         the units are returned to the semaphore.
///
/// \note The caller must guarantee that \c sem is valid as long as
///      \ref seaphore_units object is alive.
///
/// \related semaphore
template <typename ExceptionFactory>
future<semaphore_units<ExceptionFactory>> get_units(basic_semaphore<ExceptionFactory> &sem, size_t units,
                                                    typename basic_semaphore<ExceptionFactory>::duration timeout) {
  return sem.wait(timeout, units).then([&sem, units] { return semaphore_units<ExceptionFactory>{sem, units}; });
}

/// \brief Consume units from semaphore temporarily
///
/// Consume units from the semaphore and returns them when the \ref
/// semaphore_units object goes out of scope. This provides a safe way to
/// temporarily take units from a semaphore and ensure that they are eventually
/// returned under all circumstances (exceptions, premature scope exits, etc).
///
/// Unlike get_units(), this calls the non-blocking consume() API.
///
/// Unlike with_semaphore(), the scope of unit holding is not limited to the
/// scope of a single async lambda.
///
/// \param sem The semaphore to take units from
/// \param units  Number of units to consume
template <typename ExceptionFactory, typename Clock = typename timer<>::clock>
semaphore_units<ExceptionFactory, Clock> consume_units(basic_semaphore<ExceptionFactory, Clock> &sem, size_t units) {
  sem.consume(units);
  return semaphore_units<ExceptionFactory, Clock>{sem, units};
}

/// \brief Runs a function protected by a semaphore
///
/// Acquires a \ref semaphore, runs a function, and releases
/// the semaphore, returning the the return value of the function,
/// as a \ref future.
///
/// \param sem The semaphore to be held while the \c func is
///            running.
/// \param units  Number of units to acquire from \c sem (as
///               with semaphore::wait())
/// \param func   The function to run; signature \c void() or
///               \c future<>().
/// \return a \ref future<> holding the function's return value
///         or exception thrown; or a \ref future<> containing
///         an exception from one of the semaphore::broken()
///         variants.
///
/// \note The caller must guarantee that \c sem is valid until
///       the future returned by with_semaphore() resolves.
///
/// \related semaphore
template <typename ExceptionFactory, typename Func, typename Clock = typename timer<>::clock>
inline futurize_t<std::result_of_t<Func()>> with_semaphore(basic_semaphore<ExceptionFactory, Clock> &sem, size_t units,
                                                           Func &&func) {
  return get_units(sem, units).then([func = std::forward<Func>(func)](auto units) mutable {
    return futurize_apply(std::forward<Func>(func)).finally([units = std::move(units)] {});
  });
}

/// \brief Runs a function protected by a semaphore with time bound on wait
///
/// If possible, acquires a \ref semaphore, runs a function, and releases
/// the semaphore, returning the the return value of the function,
/// as a \ref future.
///
/// If the semaphore can't be acquired within the specified timeout, returns
/// a semaphore_timed_out exception
///
/// \param sem The semaphore to be held while the \c func is
///            running.
/// \param units  Number of units to acquire from \c sem (as
///               with semaphore::wait())
/// \param timeout a duration specifying when to timeout the current request
/// \param func   The function to run; signature \c void() or
///               \c future<>().
/// \return a \ref future<> holding the function's return value
///         or exception thrown; or a \ref future<> containing
///         an exception from one of the semaphore::broken()
///         variants.
///
/// \note The caller must guarantee that \c sem is valid until
///       the future returned by with_semaphore() resolves.
///
/// \related semaphore
template <typename ExceptionFactory, typename Func>
inline futurize_t<std::result_of_t<Func()>> with_semaphore(basic_semaphore<ExceptionFactory> &sem, size_t units,
                                                           typename basic_semaphore<ExceptionFactory>::duration timeout,
                                                           Func &&func) {
  return get_units(sem, units, timeout).then([func = std::forward<Func>(func)](auto units) mutable {
    return futurize_apply(std::forward<Func>(func)).finally([units = std::move(units)] {});
  });
}

/// default basic_semaphore specialization that throws semaphore specific
/// exceptions on error conditions.
using semaphore = basic_semaphore<semaphore_default_exception_factory>;

/// @}

}  // namespace base
}  // namespace Tianmu

#endif  // TIANMU_BASE_SEMAPHORE_H_
